# ๐ Code Snippets and Lines Explanation
## ๐ Table of Contents
1. [Custom MCP Server Implementation](#custom-mcp-server-implementation)
2. [Public MCP Server Implementation](#public-mcp-server-implementation)
3. [Streamlit Dashboard Implementation](#streamlit-dashboard-implementation)
4. [Tool Integration System](#tool-integration-system)
5. [Statistics and Monitoring](#statistics-and-monitoring)
6. [Configuration Management](#configuration-management)
7. [Error Handling Patterns](#error-handling-patterns)
8. [Threading and Concurrency](#threading-and-concurrency)
---
## ๐ง Custom MCP Server Implementation
### Flask Server Setup (`custom_mcp/server.py`)
```python
# Lines 1-11: Environment and imports setup
import logging
from dotenv import load_dotenv
import os
from flask import Flask, request, jsonify
# Load environment variables from .env file
dotenv_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), '.env')
load_dotenv(dotenv_path)
from custom_mcp.mcp_controller import MCPController
```
**Explanation:**
- **Lines 1-5**: Import essential libraries for web server, logging, and environment management
- **Lines 7-9**: Construct path to `.env` file located three directories up from current file
- **Line 10**: Import the main controller class that handles business logic
```python
# Lines 13-21: Application initialization
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
app = Flask(__name__)
controller = MCPController()
```
**Explanation:**
- **Lines 14-16**: Configure logging with timestamp, level, and message format
- **Line 19**: Create Flask application instance
- **Line 20**: Initialize MCP controller for handling tasks and AI interactions
### Task Creation Endpoint
```python
# Lines 27-35: Task creation endpoint
@app.route("/task", methods=["POST"])
def create_task():
payload = request.json or {}
logging.info("POST /task payload: %s", payload)
task_id = controller.create_task(
payload.get("input", ""),
payload.get("tools", [])
)
return jsonify({"task_id": task_id}), 201
```
**Explanation:**
- **Line 27**: Define POST endpoint for task creation
- **Line 29**: Extract JSON payload with fallback to empty dict
- **Line 30**: Log incoming request for debugging
- **Lines 31-34**: Call controller to create task with input text and tools list
- **Line 35**: Return task ID with HTTP 201 (Created) status
### Task Execution Endpoint
```python
# Lines 37-41: Task execution endpoint
@app.route("/task/<task_id>/run", methods=["POST"])
def run_task(task_id):
logging.info("POST /task/%s/run", task_id)
result = controller.run(task_id)
return jsonify(result)
```
**Explanation:**
- **Line 37**: Define POST endpoint with task_id parameter in URL
- **Line 39**: Log task execution request
- **Line 40**: Execute task through controller
- **Line 41**: Return execution result as JSON
---
## ๐ง MCP Controller Logic (`custom_mcp/mcp_controller.py`)
### Initialization and Threading
```python
# Lines 16-28: Controller initialization with thread safety
class MCPController:
def __init__(self):
self.tasks = {}
self.queries_processed = 0
self.total_response_time = 0.0
self.successful_queries = 0
self.failed_queries = 0
self.session_start_time = time.time()
self.lock = threading.Lock()
logging.info("MCPController initialized")
```
**Explanation:**
- **Line 21**: Dictionary to store active tasks by ID
- **Lines 22-25**: Statistics counters for monitoring
- **Line 26**: Record session start time for uptime calculation
- **Line 27**: Threading lock for thread-safe statistics updates
- **Line 28**: Log successful initialization
### Task Creation with UUID
```python
# Lines 30-35: Task creation with unique ID generation
def create_task(self, user_input: str, tools: list[str]) -> str:
task_id = str(uuid.uuid4())
self.tasks[task_id] = {"input": user_input, "tools": tools}
logging.info("Created task %s input=%r tools=%s",
task_id, user_input, tools)
return task_id
```
**Explanation:**
- **Line 31**: Generate unique UUID for task identification
- **Line 32**: Store task data in memory dictionary
- **Lines 33-34**: Log task creation with details
- **Line 35**: Return task ID for client reference
### AI Integration and Tool Processing
```python
# Lines 44-48: Tool integration logic
task = self.tasks[task_id]
text = task["input"]
if "sample_tool" in task["tools"]:
text = sample_tool(text)
logging.info("After sample_tool: %r", text)
```
**Explanation:**
- **Line 44**: Retrieve task data from storage
- **Line 45**: Extract input text
- **Line 46**: Check if sample_tool is requested
- **Line 47**: Apply tool transformation (string reversal)
- **Line 48**: Log tool output for debugging
```python
# Lines 50-63: Gemini API integration with error handling
prompt = f"Process the input: {text}"
logging.info("Sending to Gemini: %s", prompt)
start_time = time.time()
try:
resp = client.models.generate_content(
model="gemini-2.5-flash",
contents=prompt
)
output = resp.text
logging.info("Gemini output: %r", output)
with self.lock:
self.queries_processed += 1
self.successful_queries += 1
self.total_response_time += (time.time() - start_time)
```
**Explanation:**
- **Line 50**: Format prompt for AI processing
- **Line 52**: Record start time for response time calculation
- **Lines 54-57**: Call Gemini API with specified model
- **Line 58**: Extract text response
- **Lines 60-63**: Thread-safe statistics update for successful queries
---
## ๐ Public MCP Server Implementation
### Statistics Tracking with Date Management
```python
# Lines 48-58: Global statistics with date tracking
stats_data = {
"active_sessions": 0,
"queries_processed": 0,
"total_response_time": 0.0,
"success_count": 0,
"failure_count": 0,
"todays_queries": 0,
"last_query_date": date.today(),
"start_time": time.time()
}
```
**Explanation:**
- **Lines 49-57**: Global dictionary storing various metrics
- **Line 56**: Track today's queries separately
- **Line 57**: Store last query date for daily reset
- **Line 58**: Record server start time
### Daily Query Reset Logic
```python
# Lines 89-92: Daily query reset mechanism
if stats_data["last_query_date"] != date.today():
stats_data["todays_queries"] = 0
stats_data["last_query_date"] = date.today()
```
**Explanation:**
- **Line 89**: Check if date has changed since last query
- **Line 90**: Reset today's query count to zero
- **Line 91**: Update last query date to current date
### AI Query Processing with Statistics
```python
# Lines 106-140: Complete query processing with stats
@app.route("/ask", methods=["POST"])
def ask_agent():
payload = request.json or {}
query = payload.get("query", "")
logging.info("POST /ask query: %r", query)
start_time = time.time()
success = False
try:
resp = client.models.generate_content(
model=cfg["model"],
contents=query
)
text = resp.text
success = True
logging.info("Gemini response: %r", text)
return jsonify({"response": text})
except Exception as e:
logging.exception("Gemini call failed")
return jsonify({"error": str(e)}), 500
finally:
elapsed = time.time() - start_time
with stats_lock:
stats_data["queries_processed"] += 1
stats_data["total_response_time"] += elapsed
if success:
stats_data["success_count"] += 1
else:
stats_data["failure_count"] += 1
```
**Explanation:**
- **Lines 107-111**: Extract query and start timing
- **Lines 113-117**: Call Gemini API with configured model
- **Lines 118-121**: Handle successful response
- **Lines 122-124**: Handle API errors with 500 status
- **Lines 125-133**: Update statistics regardless of success/failure
---
## ๐จ Streamlit Dashboard Implementation
### Page Configuration and CSS Injection
```python
# Lines 5-11: Streamlit page configuration
st.set_page_config(
page_title="Agentic AI Demo",
page_icon="๐",
layout="wide",
initial_sidebar_state="collapsed"
)
```
**Explanation:**
- **Line 6**: Set browser tab title
- **Line 7**: Set favicon emoji
- **Line 8**: Use wide layout for better space utilization
- **Line 9**: Hide sidebar by default for cleaner interface
### Advanced CSS Styling
```css
/* Lines 27-31: Main application background */
.stApp {
background: #1e3c72;
font-family: "Inter", sans-serif;
min-height: 100vh;
}
```
**Explanation:**
- **Line 28**: Set gradient blue background
- **Line 29**: Use modern Inter font family
- **Line 30**: Ensure full viewport height
```css
/* Lines 34-51: Animated particle background */
.stApp::before {
content: '';
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
background-image:
radial-gradient(2px 2px at 20px 30px, rgba(255,255,255,0.3), transparent),
radial-gradient(2px 2px at 40px 70px, rgba(255,255,255,0.2), transparent),
/* ... more gradients ... */
background-repeat: repeat;
background-size: 200px 100px;
z-index: -1;
animation: floatParticles 20s linear infinite;
}
```
**Explanation:**
- **Lines 35-40**: Create fixed overlay for particles
- **Lines 41-46**: Multiple radial gradients for particle effect
- **Line 50**: 20-second infinite animation cycle
### Glassmorphism Effects
```css
/* Lines 98-105: Main content glassmorphism */
.main-content {
background: rgba(255, 255, 255, 0.1);
backdrop-filter: blur(20px);
border-radius: 20px;
padding: 1.5rem;
box-shadow: 0 8px 32px rgba(31, 38, 135, 0.37);
border: 1px solid rgba(255, 255, 255, 0.18);
animation: slideUp 0.8s ease-out;
}
```
**Explanation:**
- **Line 99**: Semi-transparent white background
- **Line 100**: Blur effect for glassmorphism
- **Line 103**: Subtle shadow for depth
- **Line 104**: Semi-transparent border
- **Line 105**: Slide-up animation on load
---
## ๐ ๏ธ Tool Integration System
### Sample Tool Implementation
```python
# Lines 1-7: Simple string reversal tool
import logging
def sample_tool(text: str) -> str:
logging.info("sample_tool received: %s", text)
result = text[::-1] # reverse the string
logging.info("sample_tool output: %s", result)
return result
```
**Explanation:**
- **Line 4**: Log input for debugging
- **Line 5**: Reverse string using Python slice notation
- **Line 6**: Log output for verification
- **Line 7**: Return transformed text
### Tool Integration in Controller
```python
# Lines 46-48: Tool execution logic
if "sample_tool" in task["tools"]:
text = sample_tool(text)
logging.info("After sample_tool: %r", text)
```
**Explanation:**
- **Line 46**: Check if specific tool is requested
- **Line 47**: Apply tool transformation to input text
- **Line 48**: Log result for debugging
---
## ๐ Statistics and Monitoring
### Thread-Safe Statistics Updates
```python
# Lines 60-63: Thread-safe success statistics
with self.lock:
self.queries_processed += 1
self.successful_queries += 1
self.total_response_time += (time.time() - start_time)
```
**Explanation:**
- **Line 60**: Acquire lock for thread safety
- **Line 61**: Increment total query counter
- **Line 62**: Increment success counter
- **Line 63**: Add response time to total
### Statistics Calculation
```python
# Lines 74-85: Statistics calculation and formatting
def get_stats(self):
with self.lock:
elapsed = time.time() - self.session_start_time
avg_response = (self.total_response_time / self.queries_processed) if self.queries_processed else 0.0
success_rate = (self.successful_queries / self.queries_processed * 100) if self.queries_processed else 0.0
return {
"active_sessions": 1,
"queries_processed": self.queries_processed,
"response_time": round(avg_response, 2),
"success_rate": round(success_rate, 2),
"todays_queries": self.queries_processed,
"uptime": round(elapsed / 60, 2)
}
```
**Explanation:**
- **Line 76**: Calculate total uptime in seconds
- **Line 77**: Calculate average response time with division by zero protection
- **Line 78**: Calculate success rate as percentage
- **Lines 79-85**: Return formatted statistics dictionary
---
## โ๏ธ Configuration Management
### Environment Variable Loading
```python
# Lines 7-9: Dynamic .env file path resolution
dotenv_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), '.env')
load_dotenv(dotenv_path)
```
**Explanation:**
- **Line 7**: Navigate three directories up from current file
- **Line 8**: Load environment variables from .env file
### API Key Validation
```python
# Lines 8-12: Secure API key handling
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
logging.error("GEMINI_API_KEY is not set in the environment!")
raise RuntimeError("Missing GEMINI_API_KEY")
client = genai.Client(api_key=api_key)
```
**Explanation:**
- **Line 8**: Retrieve API key from environment
- **Lines 9-11**: Validate API key exists and raise error if missing
- **Line 12**: Initialize Gemini client with validated API key
### YAML Configuration Loading
```python
# Lines 17-21: YAML configuration loading
cfg_path = os.path.join(os.path.dirname(__file__), "agent_config.yaml")
with open(cfg_path) as f:
cfg = yaml.safe_load(f)
logging.info("Loaded config: %s", cfg)
```
**Explanation:**
- **Line 17**: Construct path to configuration file
- **Lines 18-19**: Load YAML configuration safely
- **Line 20**: Log loaded configuration for verification
---
## ๐ Error Handling Patterns
### Try-Catch with Statistics
```python
# Lines 53-70: Comprehensive error handling
try:
resp = client.models.generate_content(
model="gemini-2.5-flash",
contents=prompt
)
output = resp.text
# ... success handling ...
except Exception as e:
logging.exception("Gemini call failed")
with self.lock:
self.queries_processed += 1
self.failed_queries += 1
self.total_response_time += (time.time() - start_time)
return {"task_id": task_id, "error": str(e)}
```
**Explanation:**
- **Lines 53-57**: Attempt API call
- **Lines 64-65**: Log exception with full traceback
- **Lines 66-69**: Update failure statistics in thread-safe manner
- **Line 70**: Return error response with task ID
### Task Validation
```python
# Lines 37-42: Task existence validation
def run(self, task_id: str) -> dict:
if task_id not in self.tasks:
logging.error("Task %s not found", task_id)
with self.lock:
self.failed_queries += 1
return {"error": "Task not found"}
```
**Explanation:**
- **Line 38**: Check if task exists in storage
- **Line 39**: Log error for missing task
- **Lines 40-41**: Update failure statistics
- **Line 42**: Return error response
---
## ๐งต Threading and Concurrency
### Thread-Safe Statistics
```python
# Lines 16-17, 27: Threading setup
import threading
import time
class MCPController:
def __init__(self):
# ... other initialization ...
self.lock = threading.Lock()
```
**Explanation:**
- **Line 16**: Import threading module
- **Line 27**: Create lock for thread-safe operations
### Lock Usage Pattern
```python
# Lines 60-63: Consistent lock usage
with self.lock:
self.queries_processed += 1
self.successful_queries += 1
self.total_response_time += (time.time() - start_time)
```
**Explanation:**
- **Line 60**: Use context manager for automatic lock release
- **Lines 61-63**: Update multiple statistics atomically
---
## ๐ Key Design Patterns
### 1. **Dependency Injection Pattern**
- Controller injected into Flask routes
- Client injected into controller
### 2. **Factory Pattern**
- UUID generation for unique task IDs
- Dynamic tool loading and execution
### 3. **Observer Pattern**
- Statistics tracking across all operations
- Real-time monitoring updates
### 4. **Strategy Pattern**
- Different server implementations (Custom vs Public)
- Pluggable tool system
### 5. **Singleton Pattern**
- Global statistics storage
- Single Gemini client instance
This comprehensive code explanation covers all major components, patterns, and implementation details of the MCP Agentic AI Server project, providing insights into both the technical implementation and architectural decisions.